package com.tcl.kafka.core;

import com.tcl.kafka.config.KafkaStartPauseProperties;
import com.tcl.kafka.constant.StartPauseConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.util.StringUtils;

import java.util.Objects;

/**
 * kafka服务操作类
 * @author liangxi.zeng
 */
@Slf4j
public class KafkaService {

    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    public KafkaService(KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
            ,KafkaStartPauseProperties kafkaStartPauseProperties) {
        this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
        initStartPause(kafkaStartPauseProperties);
    }

    /**
     * 初始化启停配置
     */
    private void initStartPause(KafkaStartPauseProperties kafkaStartPauseProperties) {
        startMultiByComma(kafkaStartPauseProperties.getStartListener());
        pauseMultiByComma(kafkaStartPauseProperties.getPauseListener());
    }

    /**
     * 暂停消费
     * @param pauseValue
     */
    public void pauseMultiByComma(String pauseValue) {
        if(!StringUtils.isEmpty(pauseValue)) {
            String[] pauseListenerIds = pauseValue.split(StartPauseConstant.SPLIT);
            for(String pauseListenerId:pauseListenerIds) {
                pauseListenerId(pauseListenerId);
            }
        }
    }

    /**
     * 恢复消费
     * @param startValue
     */
    public void startMultiByComma(String startValue) {
        if(!StringUtils.isEmpty(startValue)) {
            String[] startListenerIds = startValue.split(StartPauseConstant.SPLIT);
            for(String startListenerId:startListenerIds) {
                startListenerId(startListenerId);
            }
        }
    }

    /**
     * 开启消费
     * @param listenerId
     */
    public void startListenerId(String listenerId) {
        MessageListenerContainer messageListenerContainer = kafkaListenerEndpointRegistry
                .getListenerContainer(listenerId);
        if(Objects.nonNull(messageListenerContainer)) {
            if(!messageListenerContainer.isRunning()) {
                messageListenerContainer.start();
            } else {
                if(messageListenerContainer.isContainerPaused()) {
                    log.info("listenerId:{},恢复",listenerId);
                    messageListenerContainer.resume();
                }
            }
        }
    }

    /**
     * 停止消费
     * @param listenerId
     */
    public void pauseListenerId(String listenerId) {
        MessageListenerContainer messageListenerContainer = kafkaListenerEndpointRegistry
                .getListenerContainer(listenerId);
        if(Objects.nonNull(messageListenerContainer) && !messageListenerContainer.isContainerPaused()) {
            log.info("listenerId:{},暂停",listenerId);
            messageListenerContainer.pause();
        }
    }
}
